TimeSeriesForestRegressor (TSF): interval features + tree ensemble

TimeSeriesForestRegressor (TSF): interval features + tree ensemble#

The Time-Series Forest regressor is an interval-based ensemble:

  • Each tree samples random intervals from the input series.

  • On each interval it computes simple summary features (commonly mean, std, slope).

  • The tree learns a mapping from these features to the target.

  • The forest prediction is the average across trees.

This notebook implements a practical, sklearn-style TimeSeriesForestRegressor(min_interval=..., ...).

If you want a version where you provide your own feature function list, see ../composable_time_series_forest_regressor/overview.ipynb.

Core intuition (with math)#

Let \(x \in \mathbb{R}^m\) be one univariate series of length \(m\) (e.g., a sliding window of past values).

For one tree \(b\):

  1. Sample \(K\) random intervals \(I_k = [s_k, e_k)\) with \(0 \le s_k < e_k \le m\).

  2. For each interval, compute features (examples):

\[\mu(I) = \frac{1}{|I|}\sum_{t \in I} x_t\]
\[\sigma(I) = \sqrt{\frac{1}{|I|}\sum_{t \in I} (x_t - \mu(I))^2}\]

For the slope, fit a least-squares line \(x_t \approx a + bt\) over the interval indices \(t=0,\dots,|I|-1\):

\[b(I) = \frac{\sum_t (t-\bar t)(x_t-\bar x)}{\sum_t (t-\bar t)^2}\]
  1. Concatenate all features into a vector \(\phi_b(x) \in \mathbb{R}^{3K}\).

  2. Fit a decision tree regressor \(f_b\) on \((\phi_b(x_i), y_i)\).

The forest predicts:

\[\hat y(x) = \frac{1}{B}\sum_{b=1}^B f_b(\phi_b(x))\]

min_interval controls the shortest allowed interval length.

  • Small min_interval captures spikes/local shape, but features can be noisy.

  • Large min_interval emphasizes slower patterns (level/trend fragments), but can miss sharp events.

import numpy as np
import pandas as pd

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio

from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"

rng = np.random.default_rng(7)

import numpy, pandas, sklearn, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
    """Accept (n, m) or (n, d, m). Return (n, d, m)."""
    X = np.asarray(X, dtype=float)
    if X.ndim == 2:
        return X[:, None, :]
    if X.ndim == 3:
        return X
    raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")


def _random_intervals(
    n_timepoints: int,
    n_intervals: int,
    *,
    min_length: int,
    max_length: int | None,
    rng: np.random.Generator,
) -> list[tuple[int, int]]:
    n_timepoints = int(n_timepoints)
    min_length = max(2, int(min_length))

    if max_length is None:
        max_length = n_timepoints
    max_length = int(min(max_length, n_timepoints))

    if min_length > max_length:
        raise ValueError("min_length cannot exceed max_length")

    intervals: list[tuple[int, int]] = []
    for _ in range(int(n_intervals)):
        length = int(rng.integers(min_length, max_length + 1))
        start = int(rng.integers(0, n_timepoints - length + 1))
        end = start + length
        intervals.append((start, end))
    return intervals


def _f_mean(seg2d: np.ndarray) -> np.ndarray:
    return np.mean(seg2d, axis=1)


def _f_std(seg2d: np.ndarray) -> np.ndarray:
    return np.std(seg2d, axis=1, ddof=0)


def _f_slope(seg2d: np.ndarray) -> np.ndarray:
    """Least-squares slope vs time index within the interval."""
    seg2d = np.asarray(seg2d, dtype=float)
    n, L = seg2d.shape
    t = np.arange(L, dtype=float)
    t_mean = float(t.mean())
    denom = float(np.sum((t - t_mean) ** 2))
    if denom == 0.0:
        return np.zeros(n, dtype=float)
    x_mean = np.mean(seg2d, axis=1)
    num = np.sum((t[None, :] - t_mean) * (seg2d - x_mean[:, None]), axis=1)
    return num / denom


def _transform_intervals(
    X3: np.ndarray,
    intervals: list[tuple[int, int]],
) -> np.ndarray:
    """Turn panel (n, d, m) into tabular features for the given intervals."""
    X3 = _as_3d_panel(X3)
    n, d, m = X3.shape

    feats = []
    for dim in range(d):
        x = X3[:, dim, :]
        for (start, end) in intervals:
            seg = x[:, start:end]
            feats.append(_f_mean(seg))
            feats.append(_f_std(seg))
            feats.append(_f_slope(seg))

    return np.column_stack(feats)
class TimeSeriesForestRegressor:
    """Time-Series Forest (TSF) regressor (interval features + tree ensemble).

    This is a compact educational implementation with a practical API.

    Parameters
    ----------
    n_estimators : int
        Number of trees.
    n_intervals : int | "sqrt"
        Intervals sampled per tree. "sqrt" uses int(sqrt(m)).
    min_interval : int
        Minimum interval length.
    max_interval : int | None
        Maximum interval length (default: m).
    max_depth, min_samples_leaf : DecisionTreeRegressor params
        Passed through to each tree.
    random_state : int | None
        Reproducible interval sampling + trees.
    """

    def __init__(
        self,
        *,
        n_estimators: int = 200,
        n_intervals: int | str = "sqrt",
        min_interval: int = 3,
        max_interval: int | None = None,
        max_depth: int | None = None,
        min_samples_leaf: int = 1,
        random_state: int | None = None,
    ) -> None:
        self.n_estimators = int(n_estimators)
        self.n_intervals = n_intervals
        self.min_interval = int(min_interval)
        self.max_interval = max_interval
        self.max_depth = max_depth
        self.min_samples_leaf = int(min_samples_leaf)
        self.random_state = random_state

        self.estimators_: list[DecisionTreeRegressor] | None = None
        self.intervals_: list[list[tuple[int, int]]] | None = None
        self.n_timepoints_: int | None = None
        self.n_dims_: int | None = None

    def _resolve_n_intervals(self, m: int) -> int:
        if isinstance(self.n_intervals, str):
            if self.n_intervals != "sqrt":
                raise ValueError("n_intervals must be int or 'sqrt'")
            return max(1, int(np.sqrt(m)))
        return int(self.n_intervals)

    def fit(self, X: np.ndarray, y: np.ndarray) -> "TimeSeriesForestRegressor":
        X3 = _as_3d_panel(X)
        y = np.asarray(y, dtype=float)
        if y.ndim != 1:
            raise ValueError("y must be 1D")
        if X3.shape[0] != y.shape[0]:
            raise ValueError("X and y must have the same number of samples")

        n, d, m = X3.shape
        self.n_timepoints_ = int(m)
        self.n_dims_ = int(d)

        n_intervals = self._resolve_n_intervals(m)

        rng = np.random.default_rng(self.random_state)
        self.estimators_ = []
        self.intervals_ = []

        for _ in range(self.n_estimators):
            seed = int(rng.integers(0, 2**32 - 1))
            tree_rng = np.random.default_rng(seed)

            intervals = _random_intervals(
                n_timepoints=m,
                n_intervals=n_intervals,
                min_length=self.min_interval,
                max_length=self.max_interval,
                rng=tree_rng,
            )
            Phi = _transform_intervals(X3, intervals)

            tree = DecisionTreeRegressor(
                random_state=seed,
                max_depth=self.max_depth,
                min_samples_leaf=self.min_samples_leaf,
            )
            tree.fit(Phi, y)

            self.estimators_.append(tree)
            self.intervals_.append(intervals)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        if self.estimators_ is None or self.intervals_ is None or self.n_timepoints_ is None:
            raise ValueError("Model is not fitted")

        X3 = _as_3d_panel(X)
        if X3.shape[1] != int(self.n_dims_):
            raise ValueError("X has different n_dims than seen in fit")
        if X3.shape[2] != int(self.n_timepoints_):
            raise ValueError("X has different n_timepoints than seen in fit")

        preds = []
        for tree, intervals in zip(self.estimators_, self.intervals_):
            Phi = _transform_intervals(X3, intervals)
            preds.append(tree.predict(Phi))

        return np.mean(np.vstack(preds), axis=0)
# --- Synthetic daily series (trend + weekly + monthly seasonality + autocorrelated noise) ---

def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
    eps = rng.normal(0.0, sigma, size=n)
    u = np.zeros(n)
    for t in range(1, n):
        u[t] = phi * u[t - 1] + eps[t]
    return u


n = 700
idx = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)

weekly = 2.0 * np.sin(2 * np.pi * t / 7) + 0.5 * np.cos(2 * np.pi * t / 7)
monthly = 1.3 * np.sin(2 * np.pi * t / 30) - 0.4 * np.cos(2 * np.pi * t / 30)
trend = 0.02 * t
noise = simulate_ar1_noise(n, phi=0.6, sigma=0.7, rng=rng)

y = 50.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")

fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y.values, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic series", xaxis_title="date", yaxis_title="value", height=420)
fig.show()
def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
    y = np.asarray(y, dtype=float)
    L = int(window_length)
    if L < 2:
        raise ValueError("window_length must be >= 2")
    if y.size <= L:
        raise ValueError("y is too short for the chosen window_length")

    X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
    y_next = y[L:]
    return X, y_next


L = 60
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)

# time-aware split
h = 120
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]

t_test = y.index[-h:]
X_train.shape, X_test.shape
((520, 60), (120, 60))
tsf = TimeSeriesForestRegressor(
    n_estimators=250,
    n_intervals="sqrt",
    min_interval=5,
    max_interval=None,
    max_depth=None,
    min_samples_leaf=2,
    random_state=7,
)

tsf.fit(X_train, y_train)
y_pred = tsf.predict(X_test)

mae = float(mean_absolute_error(y_test, y_pred))
rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
r2 = float(r2_score(y_test, y_pred))

print("MAE:", mae)
print("RMSE:", rmse)
print("R^2:", r2)
MAE: 2.0957017770743134
RMSE: 2.589710861221658
R^2: -0.506010971830327
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=y_pred, name="pred (1-step)", line=dict(color="#4E79A7")))
fig.update_layout(title="TimeSeriesForestRegressor one-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
# Visual intuition: show some random intervals used by one tree over one input window
tree_idx = 0
intervals = tsf.intervals_[tree_idx]

window = X_test[-1]
x_axis = np.arange(window.size)

fig = go.Figure()
fig.add_trace(go.Scatter(x=x_axis, y=window, name="window", line=dict(color="black")))

for (s, e) in intervals[:12]:
    fig.add_vrect(x0=s, x1=e - 1, fillcolor="rgba(78,121,167,0.10)", line_width=0)

fig.update_layout(
    title=f"Example random intervals (tree {tree_idx}) over one input window",
    xaxis_title="lag index in window",
    yaxis_title="value",
)
fig.show()